package top.choviwu.garbage.sort.service.asyn;

import java.util.Observable;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* @Description:  {@link AbstractEventHandler#handle(Object)}
* @author choviwu
*/

public class EventQueue<E> extends Observable {

	private  static ExecutorService executor ;
	/**
	* @Description: 这里使用阻塞队列
	*/

	private final Queue<E> queue = new LinkedBlockingQueue<E>();

	public EventQueue(final AbstractEventHandler<E> handler, int poolSize) {

		super();
		init(handler, poolSize);
	}

	public void init(final AbstractEventHandler<E> handler, int poolSize){
		//创建一个线程池 用来异步消费队列
		executor  = new ThreadPoolExecutor(poolSize,poolSize,0L,
				TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new EventThreadFactory());
		// 设置队列信息
		handler.setQueue(this);
		//观察者模式 用来监听队列的动态通知 如果队列有变则进行取出消费
		this.addObserver((o,v)->executor.execute(handler));
	}
	public void offer(E e) {

		setChanged();
		synchronized (queue) {
			queue.add(e);
		}
		notifyObservers();

	}

	protected E poll() {

		if (queue.isEmpty()){
			return null;
		}
		/**
		* @Description: 拿到第一条消息
		*/
		return queue.poll();

	}

	private static class EventThreadFactory implements ThreadFactory{
		AtomicLong number = new AtomicLong(1);
		private static final String PRE = "event_";
		@Override
		public Thread newThread(Runnable r) {
			Thread thread = new Thread(r,PRE+number);
			thread.setDaemon(false);
			thread.setPriority(Thread.NORM_PRIORITY);
			return thread;
		}
	}
}
